AkkaのActorでBroadcast
概要
AkkaのActorは、ScalaのActorsと異なり、ActorSystemという根幹システムを持っている。
これによって、Actor間を跨いだコントロールや、システム同士のコントロールが可能になっている。
Actors(2.9.x以前のScalaのActor)だと、この辺が無かったので、
Broadcast = メッセージが全Actorに届く とかしたい場合、自前でobject置くとかして対処してた。
要は、機構として、Centralみたいなものが無くて、自前で実装しなきゃいけなかった、と。
その辺、1WくらいでざっとJavaから使えるようにしたのがこの辺
https://github.com/sassembla/ScalaMessengerPrototype
で、Akkaで書き直してJava?なにそれ?な感じになったものを使ってはや三ヶ月。
月日が経つのは速い。
もうActorsに関しては何もかも無かった事にしていいと思う。
で、
AkkaのActorでBroadcastな仕組みを作ってみる
例として今回のお題、複数のActor間でメッセージを解釈したい場合、
AkkaのActorでは、EventBusを使ってpub-subが実装できる。
参考:http://doc.akka.io/docs/akka/2.1-M2/scala/event-bus.html
(Akka Router のBroadcastはこれとは異なった用途に使用する)
参考:http://doc.akka.io/docs/akka/snapshot/scala/routing.html
こんな感じ。
SampleBroadcasterMain.scala
import akka.actor._
case class Message(body:String)
class SampleActor extends Actor {
import java.util.UUID
val myId = UUID.randomUUID.toString
def receive = {
case message:Message => println("I am "+myId+" /message:"+message.body)
}
}
object SampleBroadcasterMain {
def main(args: Array[String]) = {
//Genereate system
val system = ActorSystem("namespace")
//Create actor
val sub1 = system.actorOf(Props[SampleActor])
//add actor to system-subscriber's network
system.eventStream.subscribe(sub1, classOf[Message])
//add another one -the 2nd
system.eventStream.subscribe(system.actorOf(Props[SampleActor]), classOf[Message])
//and add another one -the 3rd
system.eventStream.subscribe(system.actorOf(Props[SampleActor]), classOf[Message])
//publish messeage from here to the all subscribers.
system.eventStream.publish(Message("hereComes! subscrivers!!"))
}
}
実行すると
17:06:50.341 [INFO] [org.gradle.api.internal.project.ant.AntLoggingAdapter] [ant:java] I am 77175ffd-9823-4444-96ac-7dc6e7d250e4 /message:hereComes! subscrivers!!
17:06:50.342 [INFO] [org.gradle.api.internal.project.ant.AntLoggingAdapter] [ant:java] I am 6166563a-c421-4ae4-a820-8fbdec895e2e /message:hereComes! subscrivers!!
17:06:50.343 [INFO] [org.gradle.api.internal.project.ant.AntLoggingAdapter] [ant:java] I am 88b7d855-b0fa-4706-87fd-558760876024 /message:hereComes! subscrivers!!
見事に、登録したactorすべてにMessageオブジェクトが投げられている。
らっくーーー。
eventBusは、既存の用途としてDeadLetterとかに使っている。
それを外側からも使えるように、参加可能なようにしてある、という。
なるほど的な感じ。
他にも何が楽か
Logとかが楽になっています。
たとえばLoggerみたいなものを個別に積んだり、
LoggerActorみたいなのを作ってCentralPointみたいなものを自前で作ったりしてたのですが、
Akkaになると、ActorSystemに対してのLoggingの受け皿が取り付け可能なので、あとからLogをつけたり外したりがとても容易になりました。
全体像としては、
・LogActorをActorへとmixin + log出力するコードをActorの中に書く
・systemへと
systemへとlogListenerを追加
val logListener = system.actorOf(Props[SampleLogListener])
と、
logを入力したいActorへとtraitをmixim
class SampleActor extends Actor with ActorLogging {
import java.util.UUID
val myId = UUID.randomUUID.toString
def receive = {
case message:Message => {
println("I am "+myId+" /message:"+message.body)
log.info("I am "+myId+" /message:"+message.body)
}
}
}
で、LogListenerの本体はこんな感じ
class SampleLogListener extends Actor {
//log
import akka.event.Logging.InitializeLogger
import akka.event.Logging.LoggerInitialized
import akka.event.Logging.Error
import akka.event.Logging.Warning
import akka.event.Logging.Info
import akka.event.Logging.Debug
def receive = {
case InitializeLogger(_) ⇒ print("initilaized")
case Error(cause, logSource, logClass, message) ⇒ print("Err " + message)
case Warning(logSource, logClass, message) ⇒ print("War " + message)
case Info(logSource, logClass, message) ⇒ print("Inf " + message)
case Debug(logSource, logClass, message) ⇒ print("Deb " + message)
}
}
ってな感じなので、後付けもらくちん。
ログはこんな感じにでます。
18:25:55.925 [INFO] [org.gradle.api.internal.project.ant.AntLoggingAdapter] [ant:java] [INFO] [01/20/2013 18:25:55.845] [namespace-akka.actor.default-dispatcher-4] [akka://namespace/user/$b] I am 77175ffd-9823-4444-96ac-7dc6e7d250e4 /message:hereComes! subscrivers!!
18:25:55.926 [INFO] [org.gradle.api.internal.project.ant.AntLoggingAdapter] [ant:java] [INFO] [01/20/2013 18:25:55.849] [namespace-akka.actor.default-dispatcher-2] [akka://namespace/user/$d] I am 6166563a-c421-4ae4-a820-8fbdec895e2e /message:hereComes! subscrivers!!
18:25:55.926 [INFO] [org.gradle.api.internal.project.ant.AntLoggingAdapter] [ant:java] [INFO] [01/20/2013 18:25:55.850] [namespace-akka.actor.default-dispatcher-3] [akka://namespace/user/$c] I am 88b7d855-b0fa-4706-87fd-558760876024 /message:hereComes! subscrivers!!
ここにfluentdとかへのつなぎ込みのコードを書けば、、、
あとは、、、わかるな、、?
おまけ ActorSystemへようこそ
logの中に、akka://namespace/user/ とかが出ました。
そう、このへんは、ActorSystemの名称なんです。さっき決めたやつ。
userから先に、actorが設置されている。
Supervisorとかの話につながる、ActorSystemの全体像が、この辺に関わる感じ。
詳しくは
http://doc.akka.io/docs/akka/2.0/general/actor-systems.html
サンプルとか
サンプルは下記にupしますた。
https://github.com/sassembla/AkkaSampleBroadcaster
フォルダで、
gradlew runJar -d とかやると、稼働する筈。